package com.bigdata.wsr.createTable.sink;

import com.bigdata.wsr.createTable.bean.FieldInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

@Slf4j
public class DorisDdlUtil {
    public static String getDorisDdl(String tableName, List<FieldInfo> fieldInfoList, Map<String, String> keyFieldMap) {
        StringBuilder dorisDdl = new StringBuilder();
        dorisDdl.append("CREATE TABLE ").append(tableName).append("(").append("\n");
        int size = fieldInfoList.size();
        for (FieldInfo fieldInfo : fieldInfoList) {
            size--;
            if (size == 0) {
                dorisDdl.append(fieldInfo.getFieldName())
                        .append(" ")
                        .append(fieldInfo.getFieldType())
                        .append(" comment ")
                        .append("'")
                        .append(fieldInfo.getFieldComment())
                        .append("'")
                        .append("\n")
                        .append(") ENGINE=OLAP")
                        .append("\n");
            } else {
                dorisDdl.append(fieldInfo.getFieldName())
                        .append(" ")
                        .append(fieldInfo.getFieldType())
                        .append(" comment ")
                        .append("'")
                        .append(fieldInfo.getFieldComment())
                        .append("'")
                        .append(",")
                        .append("\n");
            }
        }
        String uniqueKeyField = MapUtils.getString(keyFieldMap, "uniqueKeyField");
        String primaryKeyField = MapUtils.getString(keyFieldMap, "primaryKeyField");
        String tableComment = MapUtils.getString(keyFieldMap, "tableComment");

        String dorisUniqueKey = null;
        if (uniqueKeyField != null) {
            dorisUniqueKey = uniqueKeyField;
        } else if (primaryKeyField != null) {
            dorisUniqueKey = primaryKeyField;
        }
        if (dorisUniqueKey != null) {
            String distributedField = dorisUniqueKey.replaceAll(",is_delete_doris", "");
            dorisDdl.append("UNIQUE KEY(").append(dorisUniqueKey).append(") ")
                    .append("COMMENT \"").append(tableComment).append("\"").append("\n")
                    .append("DISTRIBUTED BY HASH(").append(distributedField).append(") BUCKETS 3").append("\n");
        } else {
            dorisDdl.append("COMMENT \"").append(tableComment).append("\"").append("\n");
        }
        dorisDdl.append("PROPERTIES (\"replication_allocation\" = \"tag.location.default: 1\",\"in_memory\" = \"false\",\"storage_format\" = \"V2\")");
        return dorisDdl.toString();
    }


    /**
     * 对字段类型 做doris适配
     *
     * @param fieldInfoList      字段信息列表
     * @param dorisFieldInfoList 多丽丝字段信息列表
     */
    public static void fieldTypeAdaptor(List<FieldInfo> fieldInfoList, List<FieldInfo> dorisFieldInfoList) {
        for (FieldInfo fieldInfo : fieldInfoList) {
            //对字段类型 做适配
            String fieldType = fieldInfo.getFieldType().toLowerCase();
            if (fieldType.contains("enum")) {
                fieldType = "varchar(" + fieldType.length() * 2 + ")";
                fieldInfo.setFieldType(fieldType);
            } else if (fieldType.contains("decimal")) {
                String numStr = fieldType.substring(fieldType.indexOf("(") + 1, fieldType.indexOf(")"));
                String[] split = numStr.split(",");
                int pre = Integer.parseInt(split[0]);
                int suf = Integer.parseInt(split[1]);
                if (pre > 27) {
                    pre = 27;
                }
                if (suf > 9) {
                    suf = 9;
                }
                String dorisType = "decimal(" + pre + "," + suf + ")";
                fieldInfo.setFieldType(dorisType);
            } else if (fieldType.contains("timestamp")) {
                fieldInfo.setFieldType("datetime");
            } else if (fieldType.contains("varchar")) {
                String numStr = fieldType.substring(fieldType.indexOf("(") + 1, fieldType.indexOf(")"));
                //MySQL的长度以字符数计算，而DorisDB应该以字节计算，解决方案则在生成创建DorisDB的SQL时，长度设置为原基础上的3倍
                int len = Integer.parseInt(numStr) * 3;
                String dorisType = "varchar(" + len + ")";
                if (len >= 65535) {
                    dorisType = "string";
                }
                fieldInfo.setFieldType(dorisType);
            } else if (fieldType.contains("text")) {
                fieldInfo.setFieldType("string");
            }
            dorisFieldInfoList.add(fieldInfo);
        }
    }

    /**
     * 对字段顺序 做doris适配
     *
     * @param keyFieldMap              关键字段映射
     * @param dorisFieldInfoList       多丽丝字段信息列表
     * @param dorisFieldSortedInfoList 多丽丝字段排序信息列表
     */
    public static void fieldSortAdaptor(Map<String, String> keyFieldMap, List<FieldInfo> dorisFieldInfoList, List<FieldInfo> dorisFieldSortedInfoList) {
        String uniqueKeyField = MapUtils.getString(keyFieldMap, "uniqueKeyField");
        String primaryKeyField = MapUtils.getString(keyFieldMap, "primaryKeyField");
        LinkedHashSet<String> uniqueKeyFieldSet = new LinkedHashSet<>();
        if (uniqueKeyField != null) {
            if (uniqueKeyField.contains(",")) {
                String[] uniqueFieldArr = uniqueKeyField.split(",");
                uniqueKeyFieldSet.addAll(Arrays.asList(uniqueFieldArr));
            }
        } else if (primaryKeyField != null) {
            if (primaryKeyField.contains(",")) {
                String[] uniqueFieldArr = primaryKeyField.split(",");
                uniqueKeyFieldSet.addAll(Arrays.asList(uniqueFieldArr));
            }
        } else {
//            throw new AnalysisException("source table have no unique keys !");
            System.out.println("source table have no unique keys !");
        }
        //用于保存unique字段
        List<FieldInfo> removeFieldInfos = new ArrayList<>();
        for (String sortedUnique : uniqueKeyFieldSet) {
            for (FieldInfo fieldInfo : dorisFieldInfoList) {
                if (sortedUnique.equals(fieldInfo.getFieldName())) {
                    removeFieldInfos.add(fieldInfo);
                    dorisFieldSortedInfoList.add(fieldInfo);
                }
            }
        }
        for (FieldInfo removeFieldInfo : removeFieldInfos) {
            dorisFieldInfoList.remove(removeFieldInfo);
        }
        dorisFieldSortedInfoList.addAll(dorisFieldInfoList);
    }

    /**
     * doris 适配
     *
     * @param fieldInfoList            字段信息列表
     * @param keyFieldMap              索引字段或者唯一键字段
     * @param dorisFieldSortedInfoList 适配后的字段信息列表
     */
    public static void dorisAdaptor(List<FieldInfo> fieldInfoList, Map<String, String> keyFieldMap, List<FieldInfo> dorisFieldSortedInfoList) {
        List<FieldInfo> dorisFieldInfoList = new ArrayList<>();
        fieldTypeAdaptor(fieldInfoList, dorisFieldInfoList);
        fieldSortAdaptor(keyFieldMap, dorisFieldInfoList, dorisFieldSortedInfoList);
        dorisFieldInfoList.clear();
    }

    public static void autoCreateDorisTable(DataSource dorisDs, String tableName, String dorisDdl) {
        Connection conn = null;
        PreparedStatement stmt = null;
        try {
            conn = dorisDs.getConnection();
            stmt = conn.prepareStatement(dorisDdl);
            stmt.execute();
            log.info("doris table created success ----------> {}", tableName);
        } catch (Exception e) {
            log.info("doris table created failed ----------> {}", tableName);
            e.printStackTrace();
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            } catch (SQLException se2) {
                se2.printStackTrace();
            }
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }
}
